 package com.longge.common.util;

import java.nio.ByteBuffer;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import org.reactivestreams.Publisher;
import org.springframework.dao.DataAccessException;
import org.springframework.data.domain.Range;
import org.springframework.data.domain.Range.Bound;
import org.springframework.data.redis.connection.DataType;
import org.springframework.data.redis.connection.ReactiveRedisConnection;
import org.springframework.data.redis.core.ReactiveHashOperations;
import org.springframework.data.redis.core.ReactiveListOperations;
import org.springframework.data.redis.core.ReactiveRedisCallback;
import org.springframework.data.redis.core.ReactiveSetOperations;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import org.springframework.data.redis.core.ReactiveZSetOperations;
import org.springframework.data.redis.core.ScanOptions;
import org.springframework.data.redis.core.ZSetOperations;

import lombok.NonNull;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/**
 * @author roger yang
 * @date 10/31/2019
 */
public class ReactiveRedisUtils {
    private static ReactiveStringRedisTemplate redisTemplate;
    private static ReactiveHashOperations<String, String, String> hashOps;
    private static ReactiveZSetOperations<String, String> zsetOps;
    private static ReactiveSetOperations<String, String> setOps;
    private static ReactiveListOperations<String, String> listOps;
    
    private ReactiveRedisUtils() {}
    
    public static ReactiveStringRedisTemplate getStringRedisTemplate() {
        return redisTemplate;
    }
    
    public static void setRedisTemplate(ReactiveStringRedisTemplate rt) {
        redisTemplate = rt;
        hashOps = redisTemplate.opsForHash();
        zsetOps = redisTemplate.opsForZSet();
        setOps = redisTemplate.opsForSet();
        listOps = redisTemplate.opsForList();
    }
    
    /**
     * ###################################################################### common command  #################################################################################
     */
    /**
     * 是否存在key
     * @param key
     * @return
     */
    public static Mono<Boolean> hasKey(@NonNull String key) {
        return redisTemplate.hasKey(key);
    }
    
    /**
     * 设置过期时间
     * @param key
     * @param timeout
     * @return
     */
    public static Mono<Boolean> expire(@NonNull String key, @NonNull Duration timeout) {
        return redisTemplate.expire(key, timeout);
    }
    
    /**
     * 设置过期的截止日期
     * @param key
     * @param expireAt
     * @return
     */
    public static Mono<Boolean> expireAt(@NonNull String key, Instant expireAt) {
        return redisTemplate.expireAt(key, expireAt);
    }
    
    /**
     * 获取剩余过期时间
     * @param key
     * @return
     */
    public static Mono<Duration> getExpire(@NonNull String key) {
        return redisTemplate.getExpire(key);
    }
    
    /**
     * 删除key
     * @param key
     * @return
     */
    public static Mono<Long> delete(@NonNull String key) {
        return redisTemplate.delete(key);
    }
    
    /**
     * 批量删除key
     * @param keys
     * @return
     */
    public static Mono<Long> deleteBatch(@NonNull String... keys) {
        return redisTemplate.delete(keys);
    }
    
    /**
     * 批量删除key
     * @param keys
     * @return
     */
    public static Mono<Long> deleteBatch(Collection<String> keys) {
    	String[] arr = new String[keys.size()];
        return redisTemplate.delete(keys.toArray(arr));
    }
    
    /**
     * 重命名key的名字到newKey，如果已经存在则覆盖
     * @param oldKey
     * @param newKey
     */
    public static Mono<Boolean> rename(@NonNull String oldKey,String newKey) {
        return redisTemplate.rename(oldKey, newKey);
    }
    
    /**
     * 如果newKey不存在则重命名
     * @param oldKey
     * @param newKey
     * @return
     */
    public static Mono<Boolean> renameIfAbsent(@NonNull String oldKey,String newKey) {
        return redisTemplate.renameIfAbsent(oldKey, newKey);
    }
    
    /**
     * 查询key的redis类型
     * @param key
     * @return
     */
    public static Mono<DataType> type(@NonNull String key) {
        return redisTemplate.type(key);
    }
    
    /**
     * 生成一个随机key
     * @return
     */
    public static Mono<String> randomKey() {
        return redisTemplate.randomKey();
    }
    /**
     * ###################################################################### string  #################################################################################
     */
    
    public static class StringOps {
        /**
         * 获取指定 key 的值
         * @param key
         * @return
         */
        public static Mono<String> get(@NonNull String key) {
            return redisTemplate.opsForValue().get(key);
        }

        /**
         * 设置指定 key 的值value
         * @param key
         * @param value
         */
        public static Mono<Boolean> set(@NonNull String key, @NonNull String value) {
            return redisTemplate.opsForValue().set(key, value);
        }

        /**
         * 为给定 key设置值并设置过期时间expiredTime
         * @param key
         * @param value
         * @param timeout
         */
        public static Mono<Boolean> setEx(@NonNull String key, @NonNull String value, @NonNull Duration timeout) {
            return redisTemplate.opsForValue().set(key, value, timeout);
        }

        /**
         * 将给定 key 的值设为 value ，并返回 key 的旧值(old value)
         * @param key
         * @param value
         * @return
         */
        public static Mono<String> getAndSet(@NonNull String key, @NonNull String value) {
            return redisTemplate.opsForValue().getAndSet(key, value);
        }

        /**
         * 将 key 中储存的数字值增1,返回新值
         * @param key
         * @return
         */
        public static Mono<Long> increment(@NonNull String key) {
            return incrementByCount(key, 1);
        }

        /**
         * 将 key 中储存的数字值增count,返回新值
         * @param key
         * @param count
         * @return
         */
        public static Mono<Long> incrementByCount(@NonNull String key, long count) {
            return redisTemplate.opsForValue().increment(key, count);
        }
        
        /**
         * 将 key 中储存的数字值减1,返回新值
         * @param key
         * @return
         */
        public static Mono<Long> decrement(@NonNull String key) {
            return decrementByCount(key, 1);
        }

        /**
         * 将 key 中储存的数字值减count,返回新值
         * @param key
         * @param count
         * @return
         */
        public static Mono<Long> decrementByCount(@NonNull String key, long count) {
            return redisTemplate.opsForValue().decrement(key, count);
        }
        
        /**
         * 批量获取元素
         * @param keys
         * @return
         */
        public static Mono<List<String>> mulitiGet(Collection<String> keys) {
            return redisTemplate.opsForValue().multiGet(keys);
        }
        
        /**
         * 批量设置值
         * @param map
         */
        public static Mono<Boolean> mulitiSet(Map<String, String> map) {
            return redisTemplate.opsForValue().multiSet(map);
        }
    }

    /**
     * ###################################################################### list  #################################################################################
     */
    
    public static class ListOps {
        /**
         * 获取指定index的值
         * @param key
         * @param index
         * @return
         */
        public static Mono<String> index(@NonNull String key, long index) {
            return listOps.index(key, index);
        }
        
        /**
         * 对index索引的值进行修改
         * @param key
         * @param index
         * @param value
         */
        public static Mono<Boolean> set(@NonNull String key, long index, @NonNull String value) {
            return listOps.set(key, index, value);
        }
        
        /**
         * 从左边(列表头部)push数据
         * @param key
         * @param value
         */
        public static Mono<Long> leftPush(@NonNull String key, @NonNull String value) {
            return listOps.leftPush(key, value);
        }
        
        /**
         * 从左边（头部）push数据并设置过期时间
         * @param key
         * @param value
         * @param timeout
         */
        public static Flux<Void> leftPush(@NonNull String key, @NonNull String value, @NonNull Duration timeout) {
            return redisTemplate.execute(new ReactiveRedisCallback<Void>() {
                @Override
                public Publisher<Void> doInRedis(ReactiveRedisConnection connection) throws DataAccessException {
                    connection.listCommands().lPushX(ByteBuffer.wrap(key.getBytes()), ByteBuffer.wrap(value.getBytes()));
                    connection.keyCommands().expire(ByteBuffer.wrap(key.getBytes()), timeout);
                    return (s) -> s.onComplete();
                }});
        }

        /**
         * 从左边(列表头部)批量push数据
         * @param key
         * @param values
         */
        public static Mono<Long> leftPushAll(@NonNull String key, @NonNull String... values) {
            return listOps.leftPushAll(key, values);
        }

        /**
         * 从左边(列表头部)批量push数据
         * @param key
         * @param list
         */
        public static Mono<Long> leftPushAll(@NonNull String key, @NonNull Collection<String> list) {
            return listOps.leftPushAll(key, list);
        }
        
        /**
         * 当key的list存在时才从左边（头部）push数据
         * @param key
         * @param value
         * @return
         */
        public static Mono<Long> leftPushIfPresent(@NonNull String key, @NonNull String value) {
            return listOps.leftPushIfPresent(key, value);
        }

        /**
         * 从右边（列表尾部）push数据
         * @param key
         * @param value
         */
        public static Mono<Long> rightPush(@NonNull String key, @NonNull String value) {
            return listOps.rightPush(key, value);
        }
        
        /**
         * 从右边（尾部）push数据并设置过期时间
         * @param key
         * @param value
         * @param timeout
         */
        public static Flux<Void> rightPush(@NonNull String key, @NonNull String value, @NonNull Duration timeout) {
            return redisTemplate.execute(new ReactiveRedisCallback<Void>() {
                @Override
                public Publisher<Void> doInRedis(ReactiveRedisConnection connection) throws DataAccessException {
                    connection.listCommands().rPushX(ByteBuffer.wrap(key.getBytes()), ByteBuffer.wrap(value.getBytes()));
                    connection.keyCommands().expire(ByteBuffer.wrap(key.getBytes()), timeout);
                    return (s) -> s.onComplete();
                }});
        }

        /**
         * 从右边（列表尾部）批量push数据
         * @param key
         * @param values
         */
        public static Mono<Long> rightPushAll(@NonNull String key, @NonNull String... values) {
            return listOps.rightPushAll(key, values);
        }

        /**
         * 从右边（列表尾部）批量push数据
         * @param key
         * @param list
         */
        public static Mono<Long> rightPushAll(@NonNull String key, @NonNull Collection<String> list) {
            return listOps.rightPushAll(key, list);
        }

        /**
         * 只有key存在的时候才从右边（尾部）push数据
         * @param key
         * @param value
         * @return
         */
        public static Mono<Long> rightPushIfPresent(@NonNull String key, @NonNull String value) {
            return listOps.rightPushIfPresent(key, value);
        }
        
        /**
         * 从左边(列表头部)移除一个数据
         * @param key
         * @return
         */
        public static Mono<String> leftPop(@NonNull String key) {
            return listOps.leftPop(key);
        }

        /**
         * 从左边(列表头部)移除一个元素，如果没有值则阻塞expiredTime时间
         * @param key
         * @param timeoutTimeUnit
         * @param timeout
         * @return
         */
        public static Mono<String> leftPop(@NonNull String key, @NonNull Duration timeout) {
            return listOps.leftPop(key, timeout);
        }
        
        /**
         * 把sourceKey队尾（右边）的元素弹出并压入destinationKey的头部（左边）
         * @param sourceKey
         * @param destinationKey
         */
        public static Flux<String> leftPopAndRightPush(@NonNull String sourceKey, @NonNull String destinationKey) {
        	return redisTemplate.execute(new ReactiveRedisCallback<String>() {
                @Override
                public Publisher<String> doInRedis(ReactiveRedisConnection connection) throws DataAccessException {
                	Mono<ByteBuffer> value = connection.listCommands().lPop(ByteBuffer.wrap(sourceKey.getBytes()));
                    if(!value.hasElement().block()) {
                    	return (s) -> {
                    		s.onComplete();
                    	};
                    }
                    connection.listCommands().rPushX(ByteBuffer.wrap(destinationKey.getBytes()), value.block());
                    return (s) -> {
                		s.onNext(value.block().toString());
                		s.onComplete();
                	};
                }
        	});
        }

        /**
         * 从右边（列表尾部）移除一个数据
         * @param key
         * @return
         */
        public static Mono<String> rightPop(@NonNull String key) {
            return listOps.rightPop(key);
        }

        /**
         * 从右边（列表尾部）移除一个元素，如果没有值则阻塞timeout时间
         * @param key
         * @param timeout
         * @return
         */
        public static Mono<String> rightPop(@NonNull String key, @NonNull Duration timeout) {
            return listOps.rightPop(key, timeout);
        }
        
        /**
         * 把sourceKey队尾（右边）的元素弹出并压入destinationKey的头部（左边）
         * @param sourceKey
         * @param destinationKey
         */
        public static Mono<String> rightPopAndLeftPush(@NonNull String sourceKey, @NonNull String destinationKey) {
            return listOps.rightPopAndLeftPush(sourceKey, destinationKey);
        }
        
        /**
         * 把sourceKey队尾（右边）的元素弹出并压入destinationKey的头部（左边）, 如果没有元素可以阻塞timeout
         * @param sourceKey
         * @param destinationKey
         * @param timeout
         */
        public static Mono<String> rightPopAndLeftPush(@NonNull String sourceKey, @NonNull String destinationKey, @NonNull Duration timeout) {
            return listOps.rightPopAndLeftPush(sourceKey, destinationKey, timeout);
        }

        /**
         * 获取列表长度
         * @param key
         * @return
         */
        public static Mono<Long> size(@NonNull String key) {
            return listOps.size(key);
        }

        /**
         * 获取列表list的所有元素
         * @param key
         * @return
         */
        public static Flux<String> all(@NonNull String key) {
            return listOps.range(key, 0, -1);
        }

        /**
         * 获取指定范围的list的数据
         * @param key
         * @param start
         * @param end
         * @return
         */
        public static Flux<String> range(@NonNull String key, long start, long end) {
            return listOps.range(key, start, end);
        }

        /**
         * 截取留下指定下标内的数据
         * @param key
         * @param start  从0开始，包含此下标
         * @param end  从0开始，包含此下标， -1表示到列尾
         */
        public static Mono<Boolean> trim(@NonNull String key, long start, long end) {
            return listOps.trim(key, start, end);
        }
    }

    /**
     * ###################################################################### hash  #################################################################################
     */
    
    public static class HashOps {
        /**
         * 获取存储在哈希表key中指定字段hashKey的值
         * @param key
         * @param hashKey
         * @return
         */
        public static Mono<String> get(@NonNull String key, @NonNull String hashKey) {
            return hashOps.get(key, hashKey);
        }

        /**
         * 获取哈希表key的所有字段和值
         * @param key
         * @return
         */
        public static Flux<Map.Entry<String, String>> getAll(@NonNull String key) {
            return hashOps.entries(key);
        }

        /**
         * 批量获取哈希表key指定的字段的值
         * @param key
         * @param hashKeys
         * @return
         */
        public static Mono<List<String>> multiGet(@NonNull String key, @NonNull Collection<String> hashKeys) {
            return hashOps.multiGet(key, hashKeys);
        }

        /**
         * 将哈希表 key 中的字段 hashKey 的值设为 value
         * @param key
         * @param hashKey
         * @param value
         */
        public static Mono<Boolean> put(@NonNull String key, @NonNull String hashKey, @NonNull String value) {
            return hashOps.put(key, hashKey, value);
        }
        
        /**
         * 设值并设置过期时间
         * @param key
         * @param hashKey
         * @param value
         * @param timeout
         */
        public static Flux<String> put(@NonNull String key,String hashKey, @NonNull String value, @NonNull Duration timeout) {
            return redisTemplate.execute(new ReactiveRedisCallback<String>() {
                @Override
                public Publisher<String> doInRedis(ReactiveRedisConnection connection) throws DataAccessException {
                    connection.hashCommands().hSet(ByteBuffer.wrap(key.getBytes()), ByteBuffer.wrap(hashKey.getBytes()), ByteBuffer.wrap(value.getBytes()));
                    connection.keyCommands().expire(ByteBuffer.wrap(key.getBytes()), timeout);
                    return (s) -> {
                    	s.onNext(value);
                    	s.onComplete();
                    };
                }
            });
        }

        /**
         * 当hashKey不存在时，将key 中的字段 hashKey 的值设为 value
         * @param key
         * @param hashKey
         * @param value
         */
        public static Mono<Boolean> putIfAbsent(@NonNull String key, @NonNull String hashKey, @NonNull String value) {
            return hashOps.putIfAbsent(key, hashKey, value);
        }

        /**
         * 同时将多个 hashKey-value (域-值)对设置到哈希表 key 中
         * @param key
         * @param map
         */
        public static Mono<Boolean> putAll(@NonNull String key, @NonNull Map<String, String> map) {
            return hashOps.putAll(key, map);
        }
        
        /**
         * 批量设值并设置过期时间
         * @param key
         * @param map
         * @param timeout
         */
        public static Flux<Void> putAll(@NonNull String key, @NonNull Map<String, String> map, @NonNull Duration timeout) {
            Map<ByteBuffer, ByteBuffer> hashes = new HashMap<>();
            map.entrySet().forEach(item -> {
                hashes.put(ByteBuffer.wrap(item.getKey().getBytes()), ByteBuffer.wrap(item.getValue().getBytes()));
            });
            return redisTemplate.execute(new ReactiveRedisCallback<Void>() {
                @Override
                public Publisher<Void> doInRedis(ReactiveRedisConnection connection) throws DataAccessException {
                	connection.hashCommands().hMSet(ByteBuffer.wrap(key.getBytes()), hashes);
                	connection.keyCommands().expire(ByteBuffer.wrap(key.getBytes()), timeout);
                    return (s) -> s.onComplete();
                }
            });
        }

        /**
         * 获取哈希表key中的所有字段hashKey
         * @param key
         * @return
         */
        public static Flux<String> keys(@NonNull String key) {
            return hashOps.keys(key);
        }

        /**
         * 获取哈希表key中所有值value
         * @param key
         * @return
         */
        public static Flux<String> values(@NonNull String key) {
            return hashOps.values(key);
        }

        /**
         * 获取哈希表中字段的数量
         * @param key
         * @return
         */
        public static Mono<Long> size(@NonNull String key) {
            return hashOps.size(key);
        }

        /**
         * 删除哈希表key的字段 hashKeys
         * @param key
         * @param hashKeys
         */
        public static Mono<Long> delete(@NonNull String key, @NonNull Object... hashKeys) {
            return hashOps.remove(key, hashKeys);
        }

        /**
         * 查看哈希表 key 中，指定的字段hashKey是否存在
         * @param key
         * @param hashKey
         * @return
         */
        public static Mono<Boolean> hasKey(@NonNull String key, @NonNull String hashKey) {
            return hashOps.hasKey(key, hashKey);
        }
        
        /**
         * 对hashKey的值加1， 返回新值
         * @param key
         * @param hashKey
         * @return 
         */
        public static Mono<Long> increment(@NonNull String key, @NonNull String hashKey) {
            return hashOps.increment(key, hashKey, 1L);
        }
    }
    
    /**
     * ###################################################################### set  #################################################################################
     */

    public static class SetOps {
        /**
         * 向集合set添加一个或多个成员
         * @param key
         * @param values
         */
        public static Mono<Long> add(@NonNull String key, @NonNull String... values) {
            return setOps.add(key, values);
        }
        
        /**
         * 向set中添加元素并设置过期时间
         * @param key
         * @param timeout
         * @param value
         */
        public static Flux<Long> add(@NonNull String key, @NonNull Duration timeout, @NonNull String... values) {
        	List<ByteBuffer> coll = Arrays.asList(values).stream().map(item -> ByteBuffer.wrap(item.getBytes())).collect(Collectors.toList());
            return redisTemplate.execute(new ReactiveRedisCallback<Long>() {
                @Override
                public Publisher<Long> doInRedis(ReactiveRedisConnection connection) throws DataAccessException {
                	Mono<Long> addResult = connection.setCommands().sAdd(ByteBuffer.wrap(key.getBytes()), coll);
                    connection.keyCommands().expire(ByteBuffer.wrap(key.getBytes()), timeout);
                    return (s) -> {
                    	if(addResult.hasElement().block()) {
                    		s.onNext(addResult.block());
                    	}
                    	s.onComplete();
                    };
                }
            });
        }
        
        /**
         * 获取集合set的成员数
         * @param key
         * @return
         */
        public static Mono<Long> size(@NonNull String key) {
            return setOps.size(key);
        }
    
        /**
         * 返回集合set中的所有成员
         * @param key
         * @return
         */
        public static Flux<String> members(@NonNull String key) {
            return setOps.members(key);
        }
    
        /**
         * 判断 value 元素是否是集合set的成员
         * @param key
         * @param value
         * @return
         */
        public static Mono<Boolean> contain(@NonNull String key, @NonNull String value) {
            return setOps.isMember(key, value);
        }
    
        /**
         * 移除并返回集合set中的一个随机元素
         * @param key
         * @return
         */
        public static Mono<String> pop(@NonNull String key) {
            return setOps.pop(key);
        }
        
        /**
         * 一次随机移除count个数量的元素
         * @param key
         * @param count
         * @return
         */
        public static Flux<String> pop(@NonNull String key, long count) {
            return setOps.pop(key, count);
        }
    
        /**
         * 移除集合中一个或多个成员，并返回移除的数量
         * @param key
         * @param values
         * @return
         */
        public static Mono<Long> removeValues(@NonNull String key, @NonNull Object... values) {
            return setOps.remove(key, values);
    
        }
    
        /**
         * 返回集合set中1个随机数，不从集合set移除
         * @param key
         * @return
         */
        public static Mono<String> random(@NonNull String key) {
            return setOps.randomMember(key);
        }
    
        /**
         * 返回集合set中count个随机数，不从集合set移除
         * @param key
         * @param count
         * @return 
         */
        public static Flux<String> multiRandom(@NonNull String key, int count) {
            return setOps.randomMembers(key, count);
        }
        
        /**
         * scan某个set，返回最多count个，匹配规则为pattern
         * @param key
         * @param count
         * @param pattern eg: *test*表示包含test字符串
         * @return
         */
        public static Flux<String> scan(@NonNull String key, long count, @NonNull String pattern) {
            return setOps.scan(key, ScanOptions.scanOptions().count(count).match(pattern).build());
        }
        
        /**
         * -------------------------------差集---------------------------
         */
        
        /**
         * 比较2个set的差集
         * @param key
         * @param otherKey
         * @return
         */
        public static Flux<String> difference(@NonNull String key, @NonNull String otherKey) {
            return setOps.difference(key, otherKey);
        }
        
        /**
         * 比较2个set的差集，并存储到destKey的set中
         * @param key
         * @param otherKey
         * @param destKey
         * @return
         */
        public static Mono<Long> differenceAndStore(@NonNull String key, @NonNull String otherKey, @NonNull String destKey) {
            return setOps.differenceAndStore(key, otherKey, destKey);
        }
        
        /**
         * 比较多个set的差集
         * @param key
         * @param otherKeys
         * @return
         */
        public static Flux<String> difference(@NonNull String key, @NonNull Collection<String> otherKeys) {
            return setOps.difference(key, otherKeys);
        }
        
        /**
         * 比较多个set的差集，并存储到destKey的set中
         * @param key
         * @param otherKeys
         * @param destKey
         * @return
         */
        public static Mono<Long> differenceAndStore(@NonNull String key, @NonNull Collection<String> otherKeys, @NonNull String destKey) {
            return setOps.differenceAndStore(key, otherKeys, destKey);
        }
        /**
         * -------------------------------差集---------------------------
         */
        /**
         * -------------------------------交集---------------------------
         */
        
        /**
         * 比较2个set的交集
         * @param key
         * @param otherKey
         * @return
         */
        public static Flux<String> intersect(@NonNull String key, @NonNull String otherKey) {
            return setOps.intersect(key, otherKey);
        }
        
        /**
         * 比较2个set的交集，并存储到destKey的set中
         * @param key
         * @param otherKey
         * @param destKey
         * @return
         */
        public static Mono<Long> intersectAndStore(@NonNull String key, @NonNull String otherKey, @NonNull String destKey) {
            return setOps.intersectAndStore(key, otherKey, destKey);
        }
        
        /**
         * 比较多个set的交集
         * @param key
         * @param otherKeys
         * @return
         */
        public static Flux<String> intersect(@NonNull String key, @NonNull Collection<String> otherKeys) {
            return setOps.intersect(key, otherKeys);
        }
        
        /**
         * 比较多个set的交集，并存储到destKey的set中
         * @param key
         * @param otherKeys
         * @param destKey
         * @return
         */
        public static Mono<Long> intersectAndStore(@NonNull String key, @NonNull Collection<String> otherKeys, @NonNull String destKey) {
            return setOps.intersectAndStore(key, otherKeys, destKey);
        }
        /**
         * -------------------------------交集---------------------------
         */
        /**
         * -------------------------------并集---------------------------
         */
        /**
         * 比较2个set的并集
         * @param key
         * @param otherKey
         * @return
         */
        public static Flux<String> union(@NonNull String key, @NonNull String otherKey) {
            return setOps.union(key, otherKey);
        }
        
        /**
         * 比较2个set的并集，并存储到destKey的set中
         * @param key
         * @param otherKey
         * @param destKey
         * @return
         */
        public static Mono<Long> unionAndStore(@NonNull String key, @NonNull String otherKey, @NonNull String destKey) {
            return setOps.unionAndStore(key, otherKey, destKey);
        }
        
        /**
         * 比较多个set的并集
         * @param key
         * @param otherKeys
         * @return
         */
        public static Flux<String> union(@NonNull String key, @NonNull Collection<String> otherKeys) {
            return setOps.union(key, otherKeys);
        }
        
        /**
         * 比较多个set的并集，并存储到destKey的set中
         * @param key
         * @param otherKeys
         * @param destKey
         * @return
         */
        public static Mono<Long> unionAndStore(@NonNull String key, @NonNull Collection<String> otherKeys, @NonNull String destKey) {
            return setOps.unionAndStore(key, otherKeys, destKey);
        }
        /**
         * -------------------------------并集---------------------------
         */
    }

    /**
     * ###################################################################### zset  #################################################################################
     */
    public static class ZSetOps {
        /**
         * 向有序集合zset添加一个成员，或者更新已存在成员的分数
         * @param key
         * @param value
         * @param score
         */
        public static Mono<Boolean> add(@NonNull String key, @NonNull String value, double score) {
            return zsetOps.add(key, value, score);
        }
    
        /**
         * 获取有序集合zset的成员数
         * @param key
         * @return
         */
        public static Mono<Long> size(@NonNull String key) {
            return zsetOps.size(key);
        }
    
        /**
         * 获取有序集合zset中指定分数区间的成员数, min<= score <=max
         * @param key
         * @param min 从0开始
         * @param max 从0开始
         * @return
         */
        public static Mono<Long> countBetweenMinMax(@NonNull String key, double min, double max) {
        	Range<Double> range = Range.from(Bound.inclusive(min)).to(Bound.inclusive(max));
            return zsetOps.count(key, range);
        }
        
        /**
         * 增加一名成员在排序设置的评分,如果成员不存在，则直接add
         * @param key
         * @param increment
         * @param member
         * @return
         */
        public static Mono<Double> incrementScore(@NonNull String key, double increment, @NonNull String member) {
            return zsetOps.incrementScore(key, member, increment);
        }
        
        /**
         * 通过索引区间返回有序集合zset指定区间内的成员，分数从低到高
         * @param key
         * @param start 索引从0开始，包含start
         * @param end 索引从0开始，包含end， -1表示到末尾
         * @return
         */
        public static Flux<String> rangeByIndex(@NonNull String key, long start, long end) {
        	Range<Long> range = Range.from(Bound.inclusive(start)).to(Bound.inclusive(end));
            return zsetOps.range(key, range);
        }
    
        /**
         * 通过索引区间返回有序集合zset指定区间内的成员且带分数，分数从低到高
         * @param key
         * @param start 索引从0开始，包含start
         * @param end 索引从0开始，包含end， -1表示到末尾
         * @return
         */
        public static Flux<ZSetOperations.TypedTuple<String>> rangeByIndexWithScores(@NonNull String key, long start, long end) {
        	Range<Long> range = Range.from(Bound.inclusive(start)).to(Bound.inclusive(end));
            return zsetOps.rangeWithScores(key, range);
        }
    
        /**
         * 通过索引区间返回有序集合zset指定区间内的成员，分数从高到低
         * @param key
         * @param start 索引从0开始，包含start
         * @param end 索引从0开始，包含end， -1表示到末尾
         * @return
         */
        public static Flux<String> reverseRangeByIndex(@NonNull String key, long start, long end) {
        	Range<Long> range = Range.from(Bound.inclusive(start)).to(Bound.inclusive(end));
            return zsetOps.reverseRange(key, range);
        }
    
        /**
         * 通过索引区间返回有序集合zset指定区间内的成员且带分数，分数从高到低
         * @param key
         * @param start 索引从0开始，包含start
         * @param end 索引从0开始，包含end， -1表示到末尾
         * @return
         */
        public static Flux<ZSetOperations.TypedTuple<String>> reverseRangeByIndexWithscores(@NonNull String key, long start, long end) {
        	Range<Long> range = Range.from(Bound.inclusive(start)).to(Bound.inclusive(end));
            return zsetOps.reverseRangeWithScores(key, range);
        }
    
        /**
         * 通过分数返回有序集合zset指定区间内的成员, 分数从低到高
         * @param key
         * @param min
         * @param max
         * @return
         */
        public static Flux<String> rangeByScore(@NonNull String key, double min, double max) {
        	Range<Double> range = Range.from(Bound.inclusive(min)).to(Bound.inclusive(max));
            return zsetOps.rangeByScore(key, range);
        }
    
        /**
         * 通过分数返回有序集合zset指定区间内的成员且带分数, 分数从低到高
         * @param key
         * @param min
         * @param max
         * @return
         */
        public static Flux<ZSetOperations.TypedTuple<String>> rangeByScoreWithscores(@NonNull String key, double min, double max) {
        	Range<Double> range = Range.from(Bound.inclusive(min)).to(Bound.inclusive(max));
            return zsetOps.rangeByScoreWithScores(key, range);
        }
    
        /**
         * 通过分数返回有序集合zset指定区间内的成员, 分数从高到低
         * @param key
         * @param min
         * @param max
         * @return
         */
        public static Flux<String> reverseRangeByScore(@NonNull String key, double min, double max) {
        	Range<Double> range = Range.from(Bound.inclusive(min)).to(Bound.inclusive(max));
            return zsetOps.reverseRangeByScore(key, range);
        }
    
        /**
         * 通过分数返回有序集合zset指定区间内的成员且带分数, 分数从高到低
         * @param key
         * @param min
         * @param max
         * @return
         */
        public static Flux<ZSetOperations.TypedTuple<String>> reverseRangeByScoreWithscores(@NonNull String key, double min, double max) {
        	Range<Double> range = Range.from(Bound.inclusive(min)).to(Bound.inclusive(max));
            return zsetOps.reverseRangeByScoreWithScores(key, range);
        }
    
        /**
         * 返回有序集合zset中指定成员的索引
         * @param key
         * @param value
         * @return
         */
        public static Mono<Long> index(@NonNull String key, @NonNull Object value) {
            return zsetOps.rank(key, value);
        }
    
        /**
         * 移除有序集合zset中的一个或多个成员，并返回移除成员的个数
         * @param key
         * @param values
         */
        public static Mono<Long> remove(@NonNull String key, @NonNull Object... values) {
            return zsetOps.remove(key, values);
        }
    
        /**
         * 移除有序集合zset给定的索引区间的所有成员，并返回移除成员的个数
         * @param key
         * @param start 索引从0开始，包含start
         * @param end 索引从0开始，包含end， -1表示到末尾
         * @return
         */
        public static Mono<Long> removeRangeByRank(@NonNull String key, long start, long end) {
        	Range<Long> range = Range.from(Bound.inclusive(start)).to(Bound.inclusive(end));
            return zsetOps.removeRange(key, range);
        }
    
        /**
         * 移除有序集合zset给定的分数区间的所有成员，并返回移除成员的个数
         * @param key
         * @param min
         * @param max
         * @return
         */
        public static Mono<Long> removeRangeByScore(@NonNull String key, double min, double max) {
        	Range<Double> range = Range.from(Bound.inclusive(min)).to(Bound.inclusive(max));
            return zsetOps.removeRangeByScore(key, range);
        }
        
        /**
         * 返回元素的分数，如果元素不存在则返回null
         * @param key
         * @param member
         * @return
         */
        public static Mono<Double> score(@NonNull String key, @NonNull String member) {
            return zsetOps.score(key, member);
        }
        
        /**
         * 将key和otherKey的数据合并(并集)到新的destKey里
         * @param key
         * @param otherKey
         * @param destKey
         * @return
         */
        public static Mono<Long> unionAndStore(@NonNull String key, @NonNull String otherKey, @NonNull String destKey) {
            return zsetOps.unionAndStore(key, otherKey, destKey);
        }
        
        /**
         * 将key和otherKey的数据的交集存储到新的destKey里
         * @param key
         * @param otherKey
         * @param destKey
         * @return
         */
        public static Mono<Long> intersectAndStore(@NonNull String key, @NonNull String otherKey, @NonNull String destKey) {
            return zsetOps.intersectAndStore(key, otherKey, destKey);
        }
        
        /**
         * 获取第一个值
         * @param key
         * @return
         */
        public static Mono<String> getFirst(@NonNull String key) {
        	Range<Long> range = Range.from(Bound.inclusive(0L)).to(Bound.inclusive(0L));
            Flux<String> set = zsetOps.range(key, range);
            if(!set.hasElements().block()) {
                return Mono.empty();
            }
            return Mono.just(set.blockFirst());
        }
        
        /**
         * 获取下一个currentScore的值（仅限score不重复的场景）
         * @param key
         * @param currentScore
         * @return
         */
        public static Mono<String> getNextValue(@NonNull String key, double currentScore) {
            double min = currentScore + 1;
            double max = currentScore+100;
            Range<Double> range = Range.from(Bound.inclusive(min)).to(Bound.inclusive(max));
            Flux<String> set = zsetOps.rangeByScore(key, range);
            if(!set.hasElements().block()) {
                return Mono.empty();
            }
            return Mono.just(set.blockFirst());
        }
        
        /**
         * 获取所有的值
         * @param key
         * @return
         */
        public static Flux<String> getAll(@NonNull String key) {
            return rangeByIndex(key, 0, -1);
        }
    }
}
